Aggregation
doesn’t only apply to numeric values. It’s a more general pattern that
arises in many application contexts. The following example shows how a
variation of parallel aggregation known as map/reduce is used to
aggregate nonscalar data types.
The example is of a social
network service, where subscribers can designate other subscribers as
friends. The site recommends new friends to each subscriber by
identifying other subscribers who are friends of friends. To limit the
number of recommendations, the service only recommends the candidates
who have the largest number of mutual friends. Candidates can be
identified in independent parallel operations, and then candidates are
ranked and selected in an aggregation operation.
Here’s how the data
structures and algorithms that are used by the recommendation service
work. Subscribers are identified by integer ID numbers. A subscriber’s
friends are represented by the collection of their IDs. The collection
is a set because each element (a friend’s ID number) occurs only once
and the order of the elements doesn’t matter. For example, the
subscriber whose ID is 0 has two friends whose IDs are 1 and 2. This
can be written as:
0 -> { 1, 2 }
The social network
repository stores an entry like this for every subscriber. In order to
recommend friends to a subscriber, the recommendation service must
consider a subscriber’s entry, as well as the entries for all of that
subscriber’s friends. For example, to recommend friends for subscriber
0, the pertinent entries in the repository are:
0 -> { 1, 2 }
1 -> { 0, 2, 3 }
2 -> { 0, 1, 3, 4 }
You can see that the
service should recommend subscribers 3 and 4 to subscriber 0 because
they appear among the friends of subscribers 1 and 2, who are already
friends of 0. In addition, the recommendation service should rank
subscriber 3 higher than 4, because 3 is a friend of both of 0’s
friends, while 4 is a friend of only one of them. You can write the
results like this:
{ 3(2), 4(1) }
This means that
subscriber 3 shares two mutual friends with subscriber 0, and
subscriber 4 shares one. This is an example of a type of collection
known as a multiset. In a multiset, each element (3 and 4 in this example) is associated with a multiplicity,
which is the number of times it occurs in the collection (2 and 1,
respectively). So a multiset is a collection where each element only
occurs once, yet it can represent duplicates (or larger
multiplicities). The order of elements in a multiset doesn’t matter.
The recommendation service uses map/reduce, which has several phases. In the first phase, which is the map
phase, the service creates a collection of candidates that can contain
duplicates — the same candidate’s ID can occur several times in the
list (once for each mutual friend). In the second phase, which is the reduce
phase, the service aggregates this collection to create a multiset
where each candidate’s ID occurs only once, but is associated with its
multiplicity in the first collection (the number of mutual friends).
There is also a postprocessing phase where the service ranks candidates
by sorting them according to their multiplicity and selects only the
candidates with the largest multiplicities.
An important feature of
map/reduce is that the result of the map stage is a collection of items
that is compatible with the reduce stage. The reduce stage uses multisets; therefore, the map stage does not produce only a list of candidate IDs; instead, it produces a collection of multisets,
where each multiset contains only one candidate with a multiplicity of
one. In this example, the output of the map stage is a collection of
two multisets. The subscribers are the numbers 3 and 4.
{ 3(1) }, { 3(1) , 4(1) }
Here, the first multiset contains friends of subscriber 1, and the second multiset contains friends of subscriber 2.
Another important feature of map/reduce is that the aggregation
in the reduce phase is performed by applying a binary operation to
pairs of elements from the collection that is produced by the map
phase. In this example, the operation is a multiset union,
which combines two multisets by collecting the elements and adding
their multiplicities. The result of applying the multiset union
operation to the two multisets in the preceding collection is:
{ 3(2), 4(1) }
Now that there is only one
multiset, the reduce phase is complete. By repeatedly applying the
multiset union operation, the reduce phase can aggregate any collection
of multisets, no matter how large, into one multiset.
Here is the code for the sequential version.
public IDMultisetItemList PotentialFriendsSequential(
SubscriberID id,
int maxCandidates)
{
// Map
var foafsList = new List<IDMultiset>();
foreach (SubscriberID friend in subscribers[id].Friends)
{
var foafs = subscribers[friend].FriendsCopy();
foafs.RemoveWhere(foaf => foaf == id ||
subscribers[id].Friends.Contains(foaf));
foafsList.Add(Multiset.Create(foafs));
}
// Reduce
IDMultiset candidates = new IDMultiset();
foreach (IDMultiset foafs in foafsList)
{
candidates = Multiset.Union(foafs, candidates);
}
// Postprocess
return Multiset.MostNumerous(candidates, maxCandidates);
}
In the map phase, this code loops sequentially over the subscriber’s friends and builds a collection of multisets of candidates (a foaf is a friend of a friend). In the reduce phase, the code loops sequentially over those multisets
and aggregates them with the multiset union operation. If this code
executes with the few subscribers in the example, the id argument is 0
and subscribers[id].Friends is { 1, 2}. When the map phase completes, foafsList is { 3(1) }, { 3(1) , 4(1) }, and when the reduce phase completes, candidates is { 3(2), 4(1) }.
Multiset union is commutative:
the result does not depend on the order of its arguments. Multiset
union is also associative: if you aggregate several multisets into one
by successively forming unions in a pair-wise manner, the final result
does not depend on the order of the union operations. If the
aggregation function is not associative, it can’t be done in parallel
without potentially getting different results. If it’s not commutative,
the potential for parallelism is greatly reduced.
Note:
Strictly speaking,
floatingpoint arithmetic is neither commutative nor associative. From
run to run, parallel computations over floats or doubles may end up
with slightly different results.
Here’s how to use PLINQ to apply map/reduce to the social networking example.
public IDMultisetItemList PotentialFriendsPLinq(SubscriberID id,
int maxCandidates)
{
var candidates =
subscribers[id].Friends.AsParallel()
.SelectMany(friend => subscribers[friend].Friends)
.Where(foaf => foaf != id &&
!(subscribers[id].Friends.Contains(foaf)))
.GroupBy(foaf => foaf)
.Select(foafGroup => new IDMultisetItem(foafGroup.Key,
foafGroup.Count()));
return Multiset.MostNumerous(candidates, maxCandidates);
}
Recall that in map/reduce, independent parallel operations (the map
phase) are followed by aggregation (the reduce phase). In the map
phase, the parallel operations iterate over all the friends of
subscriber 0. The map phase is performed by the SelectMany method, which finds all the friends of each friend of the subscriber, and the Where
method, which prevents redundant recommendations by removing the
subscriber and the subscriber’s own friends. The output of the map
phase is a collection of candidate IDs, including duplicates. The
reduce phase is performed by the GroupBy method, which collects duplicate candidate IDs into groups, and the Select method, which converts each group into a multiset item that associates the candidate ID with a multiplicity (or Count). The return statement performs the final postprocessing step that selects the candidates with the highest multiplicities.
When map/reduce is implemented with PLINQ, it need not be a line-by-line translation of the foreach version. In the PLINQ example, the output of the map stage is not a collection of multisets, but a collection with duplicates. The multiset is not formed until the reduce stage.
The online source code for this example also includes map/reduce implemented with the Parallel.ForEach method.